package org.dragonnova.business.web;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.servlet.http.HttpSession;

import org.dragonnova.business.common.Constants;
import org.dragonnova.business.message.GlobalMessage;
import org.dragonnova.business.message.MessageFactory;
import org.dragonnova.business.message.mq.kafka.KafkaMessageListener;
import org.dragonnova.business.message.mq.kafka.KafkaMessageListener.BatchListener;
import org.dragonnova.business.message.mq.kafka.Listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

import com.base.pub.util.ObjectUtils;
import com.base.pub.websocket.SimpleWebSocketHandler;
import com.google.common.collect.Sets;

public class TagAlarmWebSocketHandler extends SimpleWebSocketHandler {

	private final static Logger LOGGER = LoggerFactory
			.getLogger(TagAlarmWebSocketHandler.class);

	private static final String ALARM_URL_MAPPING = "webSocket/alarmLog.ws";

	@Autowired
	private KafkaMessageListener messageListener;

	@Autowired
	private MessageFactory messageFactory;

	private Listener alarmBatchListener;

	private volatile boolean isTrigger = false;

	@Override
	public void afterConnectionEstablished(WebSocketSession session)
			throws Exception {
		super.initSession(session);

		Map parameters = (Map) session.getAttributes().get("parameters1");
		this.isTrigger = parameters.get("isTrigger") == null ? false : Boolean
				.valueOf((String) parameters.get("isTrigger"));

		if (alarmBatchListener == null) {
			alarmBatchListener = new AlarmBatchListener();
			messageListener.addListener(alarmBatchListener);
			messageListener.setTrigger(this.isTrigger);
		}
	}

	@Override
	protected void handleTextMessage(WebSocketSession session,
			TextMessage message) throws Exception {
		HttpSession httpSession = (HttpSession) session.getAttributes().get(
				"httpsession");
		final int userId = WebSocketSessionManager.getUserId(httpSession);
		if (userId == -1) {
			// TODO 权限
			return;
		}

		// TODO
	}

	@Override
	public void handleTransportError(WebSocketSession wss, Throwable thrwbl)
			throws Exception {
		LOGGER.warn("websocket  transportError : " + thrwbl.getMessage());
		if (thrwbl instanceof IOException) {
			super.closeSession(wss);
		}
	}

	@Override
	public void afterConnectionClosed(WebSocketSession wss, CloseStatus cs)
			throws Exception {
		LOGGER.warn("websocket connection closed...... " + cs.getCode());
		super.closeSession(wss);
	}

	@Override
	public boolean supportsPartialMessages() {
		return false;
	}

	private boolean sendTagListMessage(final String topic,
			final Collection<Object> data, final Long[] timestamp) {
		// TODO filter tagid userid
		int i = 0;
		List<GlobalMessage> result = new ArrayList<GlobalMessage>();
		try {
			for (Object dObject : data) {
				result.addAll(MessageFactory.transfer(messageFactory, topic,
						(String) dObject, timestamp[i++]));
			}
			sendMessage(getWebSocketSessionManager().packData(
					Constants.RESULT_WARN_INFO, topic, result));
			return true;
		} catch (IOException e) {
			LOGGER.error(ObjectUtils.stackTraceToString(e));
			return false;
		}
	}

	private class AlarmBatchListener implements BatchListener {

		@Override
		public Set<String> getTopics() {
			return Sets.newHashSet(Constants.TOPIC_PAY, Constants.TOPIC_ALARM);
		}

		@Override
		public boolean onMessage(String topic, Collection<Object> data,
				Long[] timestamp) {
			return sendTagListMessage(topic, data, timestamp);
		}

	}
}
